/*
 * Copyright 2011-2012 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.sabayframework.mem.impl;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.sabayframework.mem.BigBuffer;

/**
 * {@link BigBufferAllocator} implementation with {@link BigBuffer} merging capabilities.
 * <p/>
 * {@link BigBuffer}s are wrapped into an {@link LinkedBigBuffer}, and when a {@link BigBuffer} is freed,
 * lookup is done to the neighbor to check if they are also free, in which case they are merged.
 * <p/>
 * {@link #setMinSizeThreshold(int)} gives the minimum buffer's size under which no splitting is done.
 * {@link #setSizeRatioThreshold(double)} gives the size ratio (requested allocation / free buffer's size} under which no splitting is done
 * <p/>
 * The free {@link BigBuffer} are held into a {@link NavigableMap} with keys defining the size's range : 0 -> first key (included), first key -> second key (included), ...
 * Instead of keeping a list of {@link BigBuffer}s sorted by capacity, {@link BigBuffer}s in the same size's range are held in the same collection.
 * The size's range are computed by {@link #generateFreeSizesRange(Integer)} and can be overridden.
 *
 */
public class MergingBigBufferAllocator extends AbstractBigBufferAllocator
{

    private static final double DEFAULT_SIZE_RATIO_THRESHOLD = 0.9;

    private static final int DEFAULT_MIN_SIZE_THRESHOLD = 128;

    // List of free pointers, with several lists of different size
    private final NavigableMap<Long, Collection<LinkedBigBuffer>> freePointers =
        new ConcurrentSkipListMap<Long, Collection<LinkedBigBuffer>>();

    // Set of used pointers. The key is #getHash(BigBuffer).
    private final Map<Integer, LinkedBigBuffer> usedPointers = new ConcurrentHashMap<Integer, LinkedBigBuffer>();

    // Lock used instead of synchronized block to guarantee consistency when manipulating list of pointers.
    private final Lock linkedStructureManipulationLock = new ReentrantLock();

    // The initial buffer, from which all the others are sliced
    private final BigBuffer parentBuffer;

    // Allowed size ratio (requested size / buffer's size) of the returned buffer before splitting
    private double sizeRatioThreshold = DEFAULT_SIZE_RATIO_THRESHOLD;

    // Min size of the returned buffer before splitting
    private int minSizeThreshold = DEFAULT_MIN_SIZE_THRESHOLD;

    // Tells if null is returned or an BufferOverflowException is thrown when the buffer is full
    private boolean returnNullWhenBufferIsFull = false;

    /**
     * Constructor.
     *
     * @param totalSize : total size of the parent buffer.
     */
    public MergingBigBufferAllocator( final int totalSize )
    {
    	this(totalSize, false);
    }

    public MergingBigBufferAllocator( final int totalSize, boolean returnNullWhenBufferIsFull) {
		super();
        this.parentBuffer = new UnsafeDirectBuffer( totalSize );
		this.returnNullWhenBufferIsFull = returnNullWhenBufferIsFull;
        init();
	}

	/**
     * Initialization function. Create an initial free {@link Pointer} mapping the whole buffer.
     */
    protected void init()
    {
        Long totalSize = Long.valueOf( parentBuffer.capacity() );

        for ( Long i : generateFreeSizesRange( totalSize ) )
        {
            freePointers.put( Long.valueOf( i ), new LinkedHashSet<LinkedBigBuffer>() );
        }

        initFirstBuffer();

    }

    /**
     * Create the first {@link LinkedBigBuffer}
     */
    private void initFirstBuffer()
    {
        parentBuffer.clear();
        final BigBuffer initialBuffer = parentBuffer.slice();
        final LinkedBigBuffer initialLinkedBuffer = new LinkedBigBuffer( 0, initialBuffer, null, null );

        insertLinkedBuffer( initialLinkedBuffer );
    }


    /**
     * Generate free sizes' range. Sizes' range are used to try to allocate the best matching {@BigBuffer}
     * regarding the requested size. Instead of using a sorted structure, arbitrary size's range are computed.
     *
     * @param totalSize
     * @return a list of all size's level used by the allocator.
     */
    protected List<Long> generateFreeSizesRange( final Long totalSize )
    {
        List<Long> sizes = new ArrayList<Long>();

        for ( long i = minSizeThreshold; i <= totalSize; i *= 8 )
        {
            sizes.add( Long.valueOf( i ) );
        }

        // If totalSize < minSizeThreshold or totalSize is not a multiple of minSizeThreshold 
        // we force adding an element to the map
        if ( sizes.isEmpty() || !sizes.contains( totalSize ) )
        {
            sizes.add( totalSize );
        }

        return sizes;
    }

    @Override
    public void free( final BigBuffer buffer )
    {
        if(buffer == EmptyBigBuffer.INSTANCE) return;

        LinkedBigBuffer returningLinkedBuffer = usedPointers.remove( getHash( buffer ) );

        if ( returningLinkedBuffer == null )
        {
            // Hu ? returned twice ? Not returned at the right place ?
            throw new IllegalArgumentException( "The buffer " + buffer + " seems not to belong to this allocator" );
        }

        try
        {
            linkedStructureManipulationLock.lock();

            if ( returningLinkedBuffer.getBefore() != null )
            {
                // if returningLinkedBuffer.getBefore is in the free list, it is free, then it's free and can be merged
                if ( getFreeLinkedBigBufferCollection( returningLinkedBuffer.getBefore() ).contains(
                    returningLinkedBuffer.getBefore() ) )
                {
                    returningLinkedBuffer = mergePointer( returningLinkedBuffer.getBefore(), returningLinkedBuffer );
                }
            }

            if ( returningLinkedBuffer.getAfter() != null )
            {
                // if returningLinkedBuffer.getAfter is in the free list, it is free, it is free, then it's free and can be merged 
                if ( getFreeLinkedBigBufferCollection( returningLinkedBuffer.getAfter() ).contains(
                    returningLinkedBuffer.getAfter() ) )
                {
                    returningLinkedBuffer = mergePointer( returningLinkedBuffer, returningLinkedBuffer.getAfter() );
                }
            }

            insertLinkedBuffer( returningLinkedBuffer );
        }
        finally
        {
            linkedStructureManipulationLock.unlock();
        }
    }

    @Override
    public BigBuffer allocate( final long size )
    {
        if(size == 0) return EmptyBigBuffer.INSTANCE;
        try
        {
            linkedStructureManipulationLock.lock();

            final SortedMap<Long, Collection<LinkedBigBuffer>> freeMap = freePointers.tailMap( size - 1 );
            for ( final Map.Entry<Long, Collection<LinkedBigBuffer>> bufferQueueEntry : freeMap.entrySet() )
            {

                Iterator<LinkedBigBuffer> linkedBigBufferIterator = bufferQueueEntry.getValue().iterator();

                while ( linkedBigBufferIterator.hasNext() )
                {
                    LinkedBigBuffer linkedBuffer = linkedBigBufferIterator.next();

                    if ( linkedBuffer.getBuffer().capacity() >= size )
                    {
                        // Remove this element from the collection
                        linkedBigBufferIterator.remove();

                        LinkedBigBuffer returnedLinkedBuffer = linkedBuffer;

                        // Check if splitting need to be performed
                        if ( linkedBuffer.getBuffer().capacity() > minSizeThreshold
                            && ( 1.0 * size / linkedBuffer.getBuffer().capacity() ) < sizeRatioThreshold )
                        {
                            // Split the buffer in a buffer that will be returned and another buffer reinserted in the corresponding queue.
                            parentBuffer.clear();
                            parentBuffer.position( linkedBuffer.getOffset() );
                            parentBuffer.limit( linkedBuffer.getOffset() + size );
                            final BigBuffer newBuffer = parentBuffer.slice();

                            returnedLinkedBuffer =
                                new LinkedBigBuffer( linkedBuffer.getOffset(), newBuffer, linkedBuffer.getBefore(),
                                                      null );

                            if ( linkedBuffer.getBefore() != null )
                            {
                                linkedBuffer.getBefore().setAfter( returnedLinkedBuffer );
                            }

                            // Insert the remaining buffer into the structure
                            parentBuffer.clear();
                            parentBuffer.position( linkedBuffer.getOffset() + size );
                            parentBuffer.limit( linkedBuffer.getOffset() + linkedBuffer.getBuffer().capacity() );
                            final BigBuffer remainingBuffer = parentBuffer.slice();
                            final LinkedBigBuffer remainingLinkedBuffer =
                                new LinkedBigBuffer( linkedBuffer.getOffset() + size, remainingBuffer,
                                                      returnedLinkedBuffer, linkedBuffer.getAfter() );

                            if ( linkedBuffer.getAfter() != null )
                            {
                                linkedBuffer.getAfter().setBefore( remainingLinkedBuffer );
                            }

                            returnedLinkedBuffer.setAfter( remainingLinkedBuffer );

                            insertLinkedBuffer( remainingLinkedBuffer );

                        }
                        else
                        {
                            // If the buffer is not split, set the limit accordingly
                            returnedLinkedBuffer.getBuffer().clear();
                            returnedLinkedBuffer.getBuffer().limit( size );
                        }

                        usedPointers.put( getHash( returnedLinkedBuffer.getBuffer() ), returnedLinkedBuffer );

                        return returnedLinkedBuffer.getBuffer();
                    }

                }
            }

            if ( returnNullWhenBufferIsFull )
            {
                return null;
            }
            else
            {
                throw new BufferOverflowException();
            }

        }
        finally
        {
            linkedStructureManipulationLock.unlock();
        }
    }

    @Override
    public void clear()
    {
        usedPointers.clear();

        for ( final Map.Entry<Long, Collection<LinkedBigBuffer>> bufferQueueEntry : freePointers.entrySet() )
        {
            bufferQueueEntry.getValue().clear();
        }

        initFirstBuffer();
    }

    private void insertLinkedBuffer( final LinkedBigBuffer linkedBuffer )
    {
        getFreeLinkedBigBufferCollection( linkedBuffer ).add( linkedBuffer );
    }

    private Collection<LinkedBigBuffer> getFreeLinkedBigBufferCollection( final LinkedBigBuffer linkedBuffer )
    {
        final Long size = Long.valueOf( linkedBuffer.getBuffer().capacity() - 1 );
        final Map.Entry<Long, Collection<LinkedBigBuffer>> bufferCollectionEntry =
            freePointers.ceilingEntry( size );
        return bufferCollectionEntry.getValue();
    }

    private LinkedBigBuffer mergePointer( final LinkedBigBuffer first, final LinkedBigBuffer next )
    {
        parentBuffer.clear();
        parentBuffer.position( first.getOffset() );
        parentBuffer.limit( first.getOffset() + first.getBuffer().capacity() + next.getBuffer().capacity() );
        final BigBuffer newBigBuffer = parentBuffer.slice();

        final LinkedBigBuffer newLinkedBigBuffer =
            new LinkedBigBuffer( first.getOffset(), newBigBuffer, first.getBefore(), next.getAfter() );

        if ( first.getBefore() != null )
        {
            first.getBefore().setAfter( newLinkedBigBuffer );
        }

        if ( next.getAfter() != null )
        {
            next.getAfter().setBefore( newLinkedBigBuffer );
        }

        // Remove the two pointers from their corresponding free lists.
        getFreeLinkedBigBufferCollection( first ).remove( first );
        getFreeLinkedBigBufferCollection( next ).remove( next );

        return newLinkedBigBuffer;
    }

    public void setSizeRatioThreshold( final double sizeRatioThreshold )
    {
        this.sizeRatioThreshold = sizeRatioThreshold;
    }

    public void setMinSizeThreshold( final int minSizeThreshold )
    {
        this.minSizeThreshold = minSizeThreshold;
    }

    public void setReturnNullWhenBufferIsFull( boolean returnNullWhenBufferIsFull )
    {
        this.returnNullWhenBufferIsFull = returnNullWhenBufferIsFull;
    }

    @Override
    public long getCapacity()
    {
        return parentBuffer.capacity();
    }

    private static class LinkedBigBuffer
    {
        private final long offset;

        private final BigBuffer buffer;

        private volatile LinkedBigBuffer before;

        private volatile LinkedBigBuffer after;

        public LinkedBigBuffer( final long offset, final BigBuffer buffer, final LinkedBigBuffer before,
                                 final LinkedBigBuffer after )
        {
            this.offset = offset;
            this.buffer = buffer;
            setBefore( before );
            setAfter( after );
        }

        public BigBuffer getBuffer()
        {
            return buffer;
        }

        public long getOffset()
        {
            return offset;
        }

        public LinkedBigBuffer getBefore()
        {
            return before;
        }

        public void setBefore( final LinkedBigBuffer before )
        {
            this.before = before;
        }

        public LinkedBigBuffer getAfter()
        {
            return after;
        }

        public void setAfter( final LinkedBigBuffer after )
        {
            this.after = after;
        }
    }

    @Override
    public void close()
        throws IOException
    {
        clear();

        try
        {
        	parentBuffer.destroy();
        }
        catch ( Exception e )
        {
            // ignore error as we are on quiet mode here
        }
    }


}